今天寫 ng conf 2019 另一篇,感覺也是跟 custom operator 有關的
ng conf 2019
How To Build Your Own RxJS Operators | Ben Lesh & Tracy Lee
https://www.youtube.com/watch?v=E6R_1QB8q4o&list=PLOETEcp3DkCpimylVKTDe968yNmNIajlR&index=51
開頭先由Tracy Lee介紹一些新東西
import { fromFetch } from 'rxjs/fetch';
import { mergeMap } from 'rxjs/operators';
fromFetch('/url/data.json').pipe(
    mergeMap(res=>res.json())
)                     ^^^^^^ 內建的
.subscribe(data=>console.log(data));
使用dictionaries of observable去得到objects stream
從observables,用name去取值
import { forkJoin,of} from 'rxjs';
forkJoin({
    foo: of('value'),
    bar: of(123),
})
.subscribe(x=>console.log(x));
// { foo: "value, bar: 123 }
import {scheduled, asapScheduler} from 'rxjs';
可以把array傳給scheduler
scheduled([1,2,3], asapScheduler)
    .subscribe(x=>console.log(x));
超棒的網站
再來由 Ben Lesh 進入正題
RxJS常常會串太長,例如:
(初學者寫成這樣就蠻棒的了,可以參考)
effects$=this.action$.pipe(
    concatMap(action=>{
        case 'LOAD':
            return this.http.get('/users').pipe(
                catchError(err=>{
                    console.warn('User load error:'+err.message);
                    return EMPTY;
                }),
                concatMap(users=>{
                    return forkJoin(
                        users.map(user=>this.http.get('/users/${user.id}/profile').pipe(
                            catchError(err=>{
                                console.ware('Profile load error:'+err.message);
                                return EMPTY;
                            })
                        ))
                    ).pipe(
                        map(profiles=>users.map((user,i)=>{
                            {user,profile:profiles[i]}
                        }))
                    );
                });
        case 'DELETE':
            return this.http.get('/users/${action.payload}').pipe(
                catchError(err=>{
                    console.warn('User delete error:'+err.message);
                    return EMPTY;
                }),
...
對高手來說,這麼一長串是不好維護的,所以要怎麼改善呢?
// 所以每個operator的回傳型別都長這樣
(source:Observable<A>)=>Observable<B>
Observalbe<A>.pipe(
    (Observable<A>)=>Observable<B>,
    (Observable<B>)=>Observable<C>,
    (Observable<C>)=>Observable<D>,
); // returns Observable<D>
map =
    (fn:(A)=>(b)=>
    (Observable<A>)=>Observable<B>
用一個Higher-Order Function
自訂一個takeEveryNth()
import {filter} from 'rxjs/operators';
emits every nth value(從上游來的Observable資料流stream)
@param n the count to emit on
export function takeEveryNth(n:number){
           VVVVVV 不是重點
    return filter( (_,i) => i%n ===0 );
                   ^^^^^^^^^^^^^^^^^ Higher-Order Function
}
使用takeEveryNth()
import {of} from 'rxjs';
import {takeEveryNth} from './take-every-nth';
of(0,1,2,3,4,5,6,7,8,9).pipe(
    takeEveryNth(3)
)
.subscribe(x=>console.log(x))
// Emits: 0,3,6,9
相對比之下,後者有更高的可讀性,且可重複使用
filter((_,i)=>i%3===0) vs takeEveryNth(3)
除此之外,也可有利於測試
// 使用一個假的Observable
it('should take every nth value from an observable',()=>{
    const results=[];
    of(0,1,2,3,4,5,6,7)
        .pipe(takeEveryNth(3))
        .subscribe(x=>results.push(x));
    expect(results).toEqual([0,3,6]);
});
// 使用Subject
it('should take every nth value',()=>{
    const results=[];
    const source=new Subject<number>();
    source.pipe(takeEveryNth(3))
        .subscribe(x=>results.push(x));
    source.next(0);
    expect(results).toEqual([0]);
    source.next(1);
    source.next(2);
    source.next(3);
    expect(results).toEqual([0,3]);
});
data$=this.loadDataClicked$.pipe(
    switchMap(()=>
        this.http.get('https://api.github.com/users?per_page=5').pipe(
            // 散佈各地的error handling,可以抽出來
            catchError(err=>{
                console.log('Error:'+err.message);
                return EMPTY;
            })
            
            // 就可以變成logError();
        )
    )
)
extract the error handling
// Handles an error by logging to console.
export function logError(){
    return catchError(err=>{
        console.log('Error:'+err.message);
        return EMPTY;
    })
}
可以再往上抽一層,改寫http get request(有error handling)
// Takes any notification and triggers a new HTTP GET request
function toHttpGetLatest(http:HttpClient,url:string){
    return switchMap(()=>
        http.get(url).pipe(
            logError(),
        )
    )
}
改寫後,可讀性更高。對於測試,也可好抽離WebAPI,換成假資料
data$=this.loadDataCliecked$.pipe(
    toHttpGetLatest(this.http, 'https://api.github.com/users?per_page=5')
)
甚至,針對某Web API寫一個function
                                                                    V 預設值
export function getGithubUsers(http:HttpClient,{pageSize}={pageSize:5}){
    return toHttpGetLatest(http,'https://api.github.com/users?per_page=${pageSize}`);
                                                                       ^^^^^^^^^^^
}
最簡單的,就是一個function形式
// T 進來、R 回傳       VVVVVV 這個function傳入參數為T
function map<T,R>(fn:(value:T)=>R){
                  ^^^^^^^^^^^^^^^ function
    // 接收一個Observable,回傳一個新的Observable
    return (source:Observable<T>)=>new Observable<R>(
    
    // inside of that initalization function
    // pass source:Observable<T>的constructor
    subscriber=>{
        // 要去subscribe source observable
        return source.subscribe({
            // 新的observer,去實作map()的next,error,complete
            next:v=>subscriber.next(fn(v)),
                                    ^^ user丟進來的function要放哪?
            error:e=>subscriber.error(e),
            complete:()=>subscriber.complete(),
        });
    });
    
    );
}
上面fn,是使用者丟進來的function(user-provided function)
可以加一個try...catch
next:v=>{
    let result:R;
    try{
        result=fn(v); // 
    }catch(err){
        subscriber.error(err);
        return;
    }
    subscriber.next(result); // 
},
這邊Ben Lesh說明蠻多的,想了解原因的可以看影片
Observable.prototype.lift的介紹
看Observable裡,lift()的原始碼
class Observable<T>{
    lift(operator:Operator<T,R>){
        const result=new Observable<R>();
        result.operator=operator;
        result.source=this; // this就是呼叫我的那個物件
        return result;
    }
}
建議要看這篇別太在意包裝上的照片與實物之間的差異:RxJS
https://jonny-huang.github.io/angular/training/30_rxjs_5/
打包機:lift
回去查閱 Observable 內的 lift 方法,可以看到 lift 會建立一個新的 Observable 來包裝目前的 Observable 與 Operator 物件。
30 天精通 RxJS (07)
https://ithelp.ithome.com.tw/articles/10187248
在 RxJS 5 的實作中,其實每個 operator 是透過原來 observable 的 lift 方法來建立新的 observable,這個方法會在新回傳的 observable 物件內偷塞兩個屬性,分別是 source 與 operator,記錄原本的資料源跟當前使用的 operator。
其實 lift 方法還是用 new Observable(跟 create 一樣)。至於為什麼要獨立出這個方法,除了更好的封裝以外,主要的原因是為了讓 RxJS 5 的使用者能更好的 debug。關於 RxJS 5 的除錯方式,我們會專門寫一篇來講解!
我找不到OperatorApplicator是什麼東東,知道的大大歡迎留言告知
interface OperatorApplicator<T,R>{
    VVVV method
    call(
        subscriber: Subscriber<R>, source: Observable<T>
    ):TeardownLogic;
}
// 可以寫成function
operatorApplicator.call(subscriber,source)
function operatorApplicator(this:Subscriber,source:Observable){}
class MapOperatorAppl<T,R> implements OperatorApplicator<T,R>{
    constructor(private fn:(value:T)=>R){}
    
    call(subscriber:Subscriber<R>,source:Observable<T>){
                      VVVVVVVVV呼叫 Observable 的訂閱(subscribe)
        return source.subscribe(
                      ^^^^^^^^^ subscribe 只接收 Observer
        // 實作 Observer 介面
        {             
            next:v=>subscriber.next(this.fn(v)),
            error:e=>subscriber.error(e),
            complete:()=>subscriber.complete(),
        }
        );
    }
}
// RxJS 程式內的 Operator 物件與我們平常講的 Operator Function 是不一樣的東西
// 寫成function
export function map<T,R>(fn:(value:T)=>R){
    return (source:Observable<T>)=>source.lift(mapOperator(fn));
                                          ^^^^建立新的Observable
}
function mapOperator(fn:(value:T)=>R){
    return function mapLifted(this:Subscriber<R>,source:Observable<T>){
        // 都長得很像
        return source.subscribe({
            // next:v=>this.next(fn(v)),
            // 加上error handling
            next:v=>{
                let result:R;
                try{result=fn(v);}catch(e){this.error(e);return;}
                this.next(r);
            }
            error:e=>this.error(e),
            complete:()=>this.complete(),
        })
    }
}
@benlesh 介紹marble tests,並示範一些operator怎麼寫單元測試
但ben lesh建議不要過度時用 marble tests
Two types of operators to test
Kevin大大的文章都是精華呀!!
https://blog.kevinyang.net/2018/05/18/angualr-testing-delay/
RxJS 6 版以後,提供了一個 TestScheduler 可以讓我們來做 Observable 的測試
(測試計畫、測試排程器?)
import { TestScheduler } from 'rxjs/testing';
let rxTest:TestScheduler;
beforeEach(()=>{
    rxTest=new TestScheduler(assertDeepEquals);
});
function assertDeepEquals(a:any,b:any){
    expect(a).toEqual(b);
}
it('should map values',()=>{
    rxTest.run(({cold, expectObservable})=>{
        // Schedule a bunch of things on the test scheduler
        // 在test scheduler做一些安排
    }); // <-- Flush the test scheduler
});
it('should map values',()=>{
    rxTest.run(({cold,expectObservable})=>{
Each letter (or number) is an emission 
of that character as a string*(e.g. "a")
                           V          V complete notification
        const src=cold('---a---a---a--|');
                  ^^^^ subscribed才會送資料流
This test observable is "cold",
meaning it's not active until subscribed to.
(there's alse 'hot',but that's not important right now)
        const result='---b---b---b--|';
              ^^^^^^ 只是期望的結果,不是observable
              
        expectObservable(
        // Sets this observable up to be run in the TestScheduler at time `0`
            src.pipe(map(a=>'b'))
        ).toBe(result);
    });
});
// 如果遇到時間相關的,例如delay
it('should test delay,()=>{
    rxTest.run(({cold,time,expectObservable})=>{
        const src=cold('--a------b------c-----|');
        const t=time('----|'); // 假設一開始都會停4隔時間
        const result='-------a------b------c-----|';
        expectObservable(src.pipe(delay(t,rxTest)).toBe(result);
                                  ^^^^^^^^^^^^^^^
        
    });
});
it('should test takeUntil',()=>{
    rxTest.run(({cold,time,expectObservable})=>{
        // 要複製到筆記本上,才能看出圖
        const src=cold('---a------b------c-----|');
        const notifier=cold('------------x----|');
        const result='---a------b-|'; // 先把結果推算出來
        
        expectObservable(
            src.pipe(takeUntil(notifier))
                     ^^^^^^^^^^^^^^^^^^^
        ).toBe(result);
    });
});
it('should map values',()=>{
    rxTest.run(({cold,time,expectObservable})=>{
              VVVVVV You can pass in a values lookup 
        const values={a:200,b:40000};    VVVVVV 可以丟值進去
        const src=cold('---a---a---a--|',values);
        const result='  ---b---b---b--|'; // 先把結果推算出來
        
        expectObservable(
            src.pipe(map(a=>a*a))
                      VVVVVV
        ).toBe(result,values);
    });
});
When to use marble tests